package com.dc.schedule.client;


import com.dc.schedule.api.model.ClientConfig;
import com.dc.schedule.api.model.SocketClientRegistryMessage;
import com.dc.schedule.api.model.SocketFireTaskMessage;
import com.dc.schedule.api.model.SocketMessage;
import com.dc.schedule.api.model.SocketStatusMessage;
import com.dc.schedule.api.model.StaticTaskRegistry;
import com.dc.schedule.api.model.StaticTaskRegistryConfig;
import com.dc.schedule.client.socket.ScheduleSocketClient;
import com.dc.schedule.client.trigger.ThreadPoolTaskTrigger;
import lombok.Getter;
import lombok.NoArgsConstructor;
import lombok.Setter;
import lombok.extern.slf4j.Slf4j;

import java.io.IOException;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.stream.Collectors;

@Slf4j
@NoArgsConstructor
public class ScheduleClient implements ScheduleEventListener {
    @Setter
    private ClientConfig clientConfig;
    @Setter
    private ScheduleSocketClient socketClient;
    @Setter
    private ExecutorService executorService;
    
    private ThreadPoolTaskTrigger threadPoolTaskTrigger;
    
    @Getter
    @Setter
    private List<StaticTaskRegistryConfig> staticTasks;
    
    public ScheduleClient(
            ClientConfig clientConfig,
            ScheduleSocketClient socketClient,
            ExecutorService executorService,
            List<StaticTaskRegistryConfig> staticTasks) {
        this.clientConfig = clientConfig;
        this.socketClient = socketClient;
        this.executorService = executorService;
        this.staticTasks = staticTasks;
    }
    
    public void start() {
        socketClient.setEndpoints(clientConfig.getEndpoints());
        socketClient.setScheduleEventListener(this);
        this.threadPoolTaskTrigger = new ThreadPoolTaskTrigger(
                executorService,
                staticTasks,
                socketClient
        );
        log.debug("Schedule Client Start");
        socketClient.start();
        log.info("Schedule Client Started");
    }
    
    private void registry() {
        final List<StaticTaskRegistry> staticTaskRegistries =
                staticTasks
                        .stream()
                        .map(StaticTaskRegistryConfig::getRegistry)
                        .collect(Collectors.toList());
        try {
            final SocketClientRegistryMessage socketClientRegistryMessage = new SocketClientRegistryMessage();
            socketClientRegistryMessage.setClientId(clientConfig.getClientId());
            socketClientRegistryMessage.setAppName(clientConfig.getAppName());
            socketClientRegistryMessage.setStaticTasks(staticTaskRegistries);
            log.info("注册当前客户端: {}", socketClientRegistryMessage);
            final SocketStatusMessage response =
                    (SocketStatusMessage) socketClient.sendMessageForResponse(socketClientRegistryMessage);
            if (!response.isSuccess()) {
                throw new IllegalStateException("客户端注册失败: " + response.getMessage());
            } else {
                log.info("注册成功: {}", socketClientRegistryMessage);
            }
        } catch (IOException e) {
            throw new IllegalStateException("注册服务器失败", e);
        } catch (InterruptedException ignored) {
        }
    }
    
    public void shutdown() {
        log.debug("Schedule Client shutting down");
        socketClient.shutdown();
        log.info("Schedule Client shut down");
    }
    
    @Override
    public void onConnect() {
        registry();
    }
    
    @Override
    public void onDisconnect() {
        log.info("client disconnect");
    }
    
    @Override
    public void onMessage(SocketMessage message) {
        log.debug("on message : {}", message);
        if (message instanceof SocketFireTaskMessage) {
            onSocketFireTaskMessage((SocketFireTaskMessage) message);
        }
    }
    
    public void onSocketFireTaskMessage(SocketFireTaskMessage message) {
        Exception ex = null;
        try {
            threadPoolTaskTrigger.fireTask(message.getTaskContext());
        } catch (Exception e) {
            log.error("任务发起失败", e);
            ex = e;
        }
        try {
            if (ex != null) {
                socketClient.sendMessage(SocketStatusMessage.error(message.getRequestUid(), ex.getMessage()));
            } else {
                socketClient.sendMessage(SocketStatusMessage.ok(message.getRequestUid()));
            }
        } catch (IOException e) {
            throw new IllegalStateException("回执发送失败", e);
        } catch (InterruptedException ignored) {
        }
        
    }
}
